如需查看状态,请访问 https://github.com/OpenMined/PySyft/issues/2771.
只要B部分恢复正常,这段话就会被移除。
在最后几节中,我们通过构建几个简单的程序来学习加密计算。 在本节中,我们将返回到第4部分的联合学习演示,我们在那里有一个“受信任的聚合器”,负责平均多个工作机的模型更新。
现在,我们将使用新的工具进行加密计算,以删除此受信任的聚合器,因为它不理想,因为它假定我们可以找到足够值得信任的人来访问此敏感信息。这并非总是如此。
因此,在这个笔记本中,我们将展示如何使用SMPC来执行安全聚合,这样我们就不需要“受信任的聚合器”。
作者:
中文版译者:
In [ ]:
import pickle
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
class Parser:
"""Parameters for training"""
def __init__(self):
self.epochs = 10
self.lr = 0.001
self.test_batch_size = 8
self.batch_size = 8
self.log_interval = 10
self.seed = 1
args = Parser()
torch.manual_seed(args.seed)
kwargs = {}
In [ ]:
with open('../data/BostonHousing/boston_housing.pickle','rb') as f:
((X, y), (X_test, y_test)) = pickle.load(f)
X = torch.from_numpy(X).float()
y = torch.from_numpy(y).float()
X_test = torch.from_numpy(X_test).float()
y_test = torch.from_numpy(y_test).float()
# preprocessing
mean = X.mean(0, keepdim=True)
dev = X.std(0, keepdim=True)
mean[:, 3] = 0. # the feature at column 3 is binary,
dev[:, 3] = 1. # so we don't standardize it
X = (X - mean) / dev
X_test = (X_test - mean) / dev
train = TensorDataset(X, y)
test = TensorDataset(X_test, y_test)
train_loader = DataLoader(train, batch_size=args.batch_size, shuffle=True, **kwargs)
test_loader = DataLoader(test, batch_size=args.test_batch_size, shuffle=True, **kwargs)
In [ ]:
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.fc1 = nn.Linear(13, 32)
self.fc2 = nn.Linear(32, 24)
self.fc3 = nn.Linear(24, 1)
def forward(self, x):
x = x.view(-1, 13)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
model = Net()
optimizer = optim.SGD(model.parameters(), lr=args.lr)
In [ ]:
import syft as sy
hook = sy.TorchHook(torch)
bob = sy.VirtualWorker(hook, id="bob")
alice = sy.VirtualWorker(hook, id="alice")
james = sy.VirtualWorker(hook, id="james")
compute_nodes = [bob, alice]
将数据发送给工作机 通常工作机已经拥有了数据,这只是出于演示目的,我们选择手动发送
In [ ]:
train_distributed_dataset = []
for batch_idx, (data,target) in enumerate(train_loader):
data = data.send(compute_nodes[batch_idx % len(compute_nodes)])
target = target.send(compute_nodes[batch_idx % len(compute_nodes)])
train_distributed_dataset.append((data, target))
In [ ]:
def train(epoch):
model.train()
for batch_idx, (data,target) in enumerate(train_distributed_dataset):
worker = data.location
model.send(worker)
optimizer.zero_grad()
# update the model
pred = model(data)
loss = F.mse_loss(pred.view(-1), target)
loss.backward()
optimizer.step()
model.get()
if batch_idx % args.log_interval == 0:
loss = loss.get()
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * data.shape[0], len(train_loader),
100. * batch_idx / len(train_loader), loss.item()))
In [ ]:
def test():
model.eval()
test_loss = 0
for data, target in test_loader:
output = model(data)
test_loss += F.mse_loss(output.view(-1), target, reduction='sum').item() # sum up batch loss
pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability
test_loss /= len(test_loader.dataset)
print('\nTest set: Average loss: {:.4f}\n'.format(test_loss))
In [ ]:
import time
In [ ]:
t = time.time()
for epoch in range(1, args.epochs + 1):
train(epoch)
total_time = time.time() - t
print('Total', round(total_time, 2), 's')
In [ ]:
test()
In [ ]:
remote_dataset = (list(),list())
train_distributed_dataset = []
for batch_idx, (data,target) in enumerate(train_loader):
data = data.send(compute_nodes[batch_idx % len(compute_nodes)])
target = target.send(compute_nodes[batch_idx % len(compute_nodes)])
remote_dataset[batch_idx % len(compute_nodes)].append((data, target))
def update(data, target, model, optimizer):
model.send(data.location)
optimizer.zero_grad()
pred = model(data)
loss = F.mse_loss(pred.view(-1), target)
loss.backward()
optimizer.step()
return model
bobs_model = Net()
alices_model = Net()
bobs_optimizer = optim.SGD(bobs_model.parameters(), lr=args.lr)
alices_optimizer = optim.SGD(alices_model.parameters(), lr=args.lr)
models = [bobs_model, alices_model]
params = [list(bobs_model.parameters()), list(alices_model.parameters())]
optimizers = [bobs_optimizer, alices_optimizer]
In [ ]:
# 选择训练哪个batch
data_index = 0
# 更新远程模型
# 我们可以在进行此操作之前对其进行多次迭代,但是在这里每个工人只进行一次迭代
for remote_index in range(len(compute_nodes)):
data, target = remote_dataset[remote_index][data_index]
models[remote_index] = update(data, target, models[remote_index], optimizers[remote_index])
In [ ]:
# 创建一个列表,我们将在其中存储我们的加密模型平均值
new_params = list()
In [ ]:
# 遍历每个参数
for param_i in range(len(params[0])):
# 对每个工作机
spdz_params = list()
for remote_index in range(len(compute_nodes)):
# 从每个工作机中选择相同的参数并复制
copy_of_parameter = params[remote_index][param_i].copy()
# 由于SMPC只能使用整数(不能使用浮点数)
# 因此我们需要使用Integers存储十进制信息。
# 换句话说,我们需要使用“固定精度”编码。
fixed_precision_param = copy_of_parameter.fix_precision()
# 现在我们在远程计算机上对其进行加密。
# 注意,fixed_precision_param“已经”是一个指针。
# 因此,当我们调用share时,它实际上是对指向的数据进行加密。
# 而它会返回一个指向MPC秘密共享对象的指针,也就是我们需要的共享分片。
encrypted_param = fixed_precision_param.share(bob, alice, crypto_provider=james)
# 现在我们获取指向MPC共享值的指针
param = encrypted_param.get()
# 保存参数,以便我们可以使用工作机的相同参数取平均值
spdz_params.append(param)
# 来自多个工作人员的平均参数,将它们提取到本地计算机上
# 以固定精度解密和解码,再返回一个浮点数
new_param = (spdz_params[0] + spdz_params[1]).get().float_precision()/2
# 保存新的平均参数
new_params.append(new_param)
In [ ]:
with torch.no_grad():
for model in params:
for param in model:
param *= 0
for model in models:
model.get()
for remote_index in range(len(compute_nodes)):
for param_index in range(len(params[remote_index])):
params[remote_index][param_index].set_(new_params[param_index])
In [ ]:
def train(epoch):
for data_index in range(len(remote_dataset[0])-1):
# update remote models
for remote_index in range(len(compute_nodes)):
data, target = remote_dataset[remote_index][data_index]
models[remote_index] = update(data, target, models[remote_index], optimizers[remote_index])
# encrypted aggregation
new_params = list()
for param_i in range(len(params[0])):
spdz_params = list()
for remote_index in range(len(compute_nodes)):
spdz_params.append(params[remote_index][param_i].copy().fix_precision().share(bob, alice, crypto_provider=james).get())
new_param = (spdz_params[0] + spdz_params[1]).get().float_precision()/2
new_params.append(new_param)
# cleanup
with torch.no_grad():
for model in params:
for param in model:
param *= 0
for model in models:
model.get()
for remote_index in range(len(compute_nodes)):
for param_index in range(len(params[remote_index])):
params[remote_index][param_index].set_(new_params[param_index])
In [ ]:
def test():
models[0].eval()
test_loss = 0
for data, target in test_loader:
output = models[0](data)
test_loss += F.mse_loss(output.view(-1), target, reduction='sum').item() # sum up batch loss
pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability
test_loss /= len(test_loader.dataset)
print('Test set: Average loss: {:.4f}\n'.format(test_loss))
In [ ]:
t = time.time()
for epoch in range(args.epochs):
print(f"Epoch {epoch + 1}")
train(epoch)
test()
total_time = time.time() - t
print('Total', round(total_time, 2), 's')
祝贺您完成本笔记本教程! 如果您喜欢此方法,并希望加入保护隐私、去中心化AI和AI供应链(数据)所有权的运动,则可以通过以下方式做到这一点!
帮助我们的社区的最简单方法是仅通过给GitHub存储库加注星标! 这有助于提高人们对我们正在构建的出色工具的认识。
保持最新进展的最佳方法是加入我们的社区! 您可以通过填写以下表格来做到这一点http://slack.openmined.org
对我们的社区做出贡献的最好方法是成为代码贡献者! 您随时可以转到PySyft GitHub的Issue页面并过滤“projects”。这将向您显示所有概述,选择您可以加入的项目!如果您不想加入项目,但是想做一些编码,则还可以通过搜索标记为“good first issue”的GitHub问题来寻找更多的“一次性”微型项目。
如果您没有时间为我们的代码库做贡献,但仍想提供支持,那么您也可以成为Open Collective的支持者。所有捐款都将用于我们的网络托管和其他社区支出,例如黑客马拉松和聚会!
In [ ]: